#include "config.h"
#include "sysy_lib.h"
#include "hash_table.h"
#include "lba_lock.h"
#include "plock.h"

typedef struct 
{
        hashtable_t ht;
        uint64_t lba;
        plock_t lock;
} lba_lock_entry_t;

static uint32_t __lba_lock_key_func(const void *k) 
{
        const uint64_t *lba = k;
        return (*lba) % 1048576;
}

static int __lba_lock_cmp_func(const void *s1, const void *s2) 
{
        const lba_lock_entry_t *e1 = s1;
        const uint64_t *lba = s2;
        if( e1->lba < *lba)
                return -1;
        else if (e1->lba > *lba)
                return 1;
        return 0;
}

int lba_lock_init(hashtable_t *ht) 
{
        hashtable_t _ht;

        *ht = NULL;

        _ht = hash_create_table(__lba_lock_cmp_func, __lba_lock_key_func, "LBA");

        *ht = _ht;
        return 0;
}

int lba_lock_destroy(hashtable_t ht) 
{
        (void) ht;
        // TODO
        // hash_destroy_table()
        return 0;
}

static int lba_lock_lock(hashtable_t ht, uint64_t lba, int is_shared) 
{
        int ret;
        lba_lock_entry_t *e;
        char *pname = NULL;
#if LOCK_DEBUG
        char lname[MAX_LOCK_NAME];

        pname = lname;
        sprintf(pname, "lba_lock.%lx", lba);
#endif

        // 更通用，lba可以作为一般的key，以下逻辑应用相关
        // YASSERT(lba % LSV_PAGE_SIZE == 0);

        e = hash_table_find(ht, (void *)&lba);
        if (e == NULL) 
        {
                ret = ymalloc((void **)&e, sizeof(lba_lock_entry_t));
                if (unlikely(ret))
                        YASSERT(0);

                e->ht = ht;
                e->lba = lba;
                plock_init(&e->lock, pname);

                ret = hash_table_insert(ht, (void *)e, &e->lba, 0);
                if (unlikely(ret)) 
                {
                        YASSERT(0);
                }
        }

        YASSERT(e != NULL);

        if (is_shared) 
        {
                plock_rdlock(&e->lock);
        }
        else 
        {
                plock_wrlock(&e->lock);
        }

        DINFO_NOP("key %llu shared %u e %p total %u\n",
              (LLU)lba,
              is_shared,
              e,
              ht->num_of_entries);

        return 0;
}

int lba_lock_wrlock(hashtable_t ht, uint64_t lba) 
{
        return lba_lock_lock(ht, lba, 0);
}

int lba_lock_rdlock(hashtable_t ht, uint64_t lba) 
{
        return lba_lock_lock(ht, lba, 1);
}

static inline int __lba_lock_free_func(void *arg) 
{
        lba_lock_entry_t *e = arg;

        DINFO_NOP("free key %llu\n", (LLU)e->lba);

        YASSERT(e != NULL);
        plock_destroy(&e->lock);
        yfree((void **)&e);

        return 0;
}

int lba_lock_unlock(hashtable_t ht, uint64_t lba) 
{
        lba_lock_entry_t *e, *e2;
    
        e = hash_table_find(ht, (const void *)&lba);
            YASSERT(e != NULL);
    
        DINFO_NOP("unlock key %llu\n", (LLU)lba);
    
        plock_unlock(&e->lock);
    
        DINFO_NOP("writer %d readers %d queue empty %d total %u\n",
                      e->lock.writer,
                      e->lock.readers,
                      list_empty_careful(&e->lock.queue),
                      ht->num_of_entries);
        if (e->lock.writer == -1 && e->lock.readers == 0 && list_empty_careful(&e->lock.queue))
        {
                hash_table_remove(ht, &lba, (void **)&e2);
                YASSERT(e2 == e);
                if (e2 != NULL) 
                {
                        YASSERT(e2->ht == ht);
                        YASSERT(e2->lba == lba);
                        __lba_lock_free_func(e2);
                }
        }
    
        return 0;
}
